AWS LambdaとDynamoDB Streamsを活用してTwitter Botを作ってみた
初めに
こんにちは!classmethodの新卒エンジニアホンギです。
記事を書いているここDevelopersIOは日本のブログですが、会社にいらっしゃる様々な国の方々が
いろんな言語でブログを書いてくださって日本語以外の言語で作成されたブログも増えています。
また、今期には私を含め、6人の韓国人が入社することになり、ハングルで作成されたポストも増えています!
よって「韓国語ブログ」のタグが貼ってある記事を集めて閲覧できるようにTwitterにポスティングするシステムを構築してみました。
ポストで使用された韓国語ブログ集です!興味がありましたら是非読んでみてください!
Architecture
流れ
- RSS feedで「韓国語ブログ」タグが貼ってある記事を受け取ります。
- 最初のLambdaで要請した記事のうち、新しい記事だけをDynamoDBに入れます。
- DynamoDBに保存されたデータは、DynamoDB Streamsを通して2番目のLambdaに移動されます。
- 2番目のLambdaでDynamoDBに追加された記事をTwitter APIを使用してTwitterにポスティングします。
- CloudWatchのEvent Rulesを利用して作業スケジューラーを作成し、一定時間ごとに実行させます。
動作環境
- Lambda Runtime : Node.js 12.x
DynamoDBテーブル作成及びStream有効化
まず、この部分を構成してみましょう。
サービスでDynamoDBを選択し、TablesタブでCreate tableをクリックします。
Table nameとPrimary Keyを定義します。Table設定はdefaultにします。
Keyは重複しないカラムのスキーマに設定してください。私は重なることがないtitleに指定してみます。
ここまですると、テーブルの作成は終わりです。ここでDynamoDB Streamsを有効化させると
データの変化(生成、修正、削除)が発生した場合、イベントをお届けできます。
view typeによってstreamsの中に入るデータが変わります。
DynamoDB streamが有効化されたら、DBの設定は終わりです。
Manage StreamのView type
Keys only : 修正されたデータのKeyのみ送信
New Image : 変更後のデータのみ転送
Old Image : 変更される前のデータのみ転送
New and Old Images : DBが変更される前と後全て転送
詳細はDynamoDB Streamsで確認することができます。
1番目のLambda作成
今回はRSSデータの読み込み部分を構成してみましょう。
(韓国語ブログのRSSデータ)
サービスでLambdaを選択し、Create functionを選択して新しいlambda関数を作成します。
Lambdaサービスで使用した権限は、下記に記載されています。
コード作成
// request rss ... Dynamodb SDK使用 ... // 受信したXML形式のデータをJSONに変換させ、DynamoDBに保存します。 const params = { TableName: "PostTest", Item:JSON.parse(body).items[i], ConditionExpression: 'attribute_not_exists(title)' //Keyが重複しないもののみ追加する。 }; ddb.put(params, (err, data) => { if (err) console.log("error : " + err); else console.log(data); });
2番目のLambda作成
今度はTwitterと連動するLambda関数を構成してみましょう。
TwitterのAPIを使うためのtwitter appの登録は終わったと仮定して進めます。
2番目の関数の場合、DynamoDBに新しいオブジェクトが追加された時に実行されるようにトリガを設定する必要があります。
追加されるとDynamoDBとLambdaが連結されます。これからは、DBにイベントが起きるたびに、この関数が実行されるでしょう。
コード作成
//twitter module 使用 ... twitter token 設定 ... for (let i = 0; i < event.Records.length; i++) { if (event.Records[i].eventName === "INSERT") { let message = event.Records[i].dynamodb.NewImage["title"]; let link = event.Records[i].dynamodb.NewImage["link"]; client.post('statuses/update', {status: "#DevelopersIO" + " " + message + "\n" + link}, function(error, tweet, response) { if (!error) console.log("tweet success: " + tweet + ", response : " + response); else console.log("tweet fail: " + error); } }); }
CloudWatchでのイベントのレコードオブジェクトのログを見ると、INSERTイベントが発生していることが分かります。
これによって、生成、削除、修正された時の分岐を分けて作業することもできます。
CloudWatchで定期的に呼び出す
最後に、CloudWatchから最初のLambdaを毎日12時、18時に呼び出してみます。
CloudWatchのEventタブにあるRulesでruleを作成します。
ScheduleのCron expressionを選択し、実行する時間を表現式に合わせて作成します。
毎日12時、18時に呼び出そうとすると表現式は(0 12,18 * * ? *)になります。
しかし、呼び出している環境の時間基準はUTCに従っていますので、9時間を引き算することが必要です。(0 3,9 * * ? *)
表現式はCronを使用したスケジュール式をお読みください。
表現式を作成し、Tagetsで呼び出そうとする2番目のLambdaを指定します。
Cron表現式が紛らわしかった場合はcrontab guruのようなサイトを利用することをおすすめします。
Ruleの名前と説明を書いたら終わりです。
作成されたRuleを押して、正しく作成されたことを確認しましょう。
CloudWatchがLambdaにトリガされていることを確認できます。
テスト
これでアーキテクチャの構造通りに全てを実現しました。上手く動作しているのかテストをしてみます。
現在、私のツイッターには何の記事もありません。
DynamoDBには現在すべての記事のデータが入力されているので、私が作成した韓国語記事を消し、最初のLambdaを実行してみます。
Logを見ると、2件のツイッターリクエストが成功したことが分かります。ツイッターにも掲載されたか確認してみよう。
ツイッターにもうまく記事が掲載されたことが確認できます。
使用したリソースのRole設定
- 1番目のLambda
AWSLambdaBasicExecutionRole
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:[region]:[AccountID]:*" },{ "Effect": "Allow", "Action": ["logs:CreateLogStream", "logs:PutLogEvents"], "Resource": "arn:aws:logs:[region]:[AccountID]:log-group:/aws/lambda/putdynamodb:*" } ] }
DynamoDBRole
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": "dynamodb:PutItem", "Resource": "arn:aws:dynamodb:[region]:[AccountID]:table/PostTest" } ] }
- 2番目のLambda
AWSLambdaBasicExecutionRole リソース位置以外同じ
DynamoDBStreamRole
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "dynamodb:GetShardIterator", "dynamodb:DescribeStream", "dynamodb:ListStreams", "dynamodb:GetRecords" ], "Resource": "arn:aws:logs:[region]:[AccountID]:log-group:/aws/lambda/postTweet:*" } ] }
単にオブジェクトを入れる権限とは異なり、StreamとShard(シャード)権限を与える必要があります。詳細はLambda実行ロール作りを参照してください。
最後に
以上で、AWSLambdaとDynamoDB Streamで記事をポスティングするサービスを構成してみました。
サーバレスに初めて接しましたが、サーバを構成する必要がなくコードが実行され、他のサービスと連結して分離するのが簡単だという点が本当に良かったです。
今後、韓国語のポストだけを集めて見ることができるDevelopers.IO KoreaブログとTwitterが公開される予定ですので、楽しみにしていてください!!
参考
DynamoDBについての詳しい文書はDynamoDB開発者ガイドをお読みください!
DynamoDBのSDK使用はDynamoDB SDKをお読みください!
DynamoDBで発生するエラーはDynamoDBエラー処理を読むと役に立ちます。